-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][aDag] Support multi node multi reader #47480
[Core][aDag] Support multi node multi reader #47480
Conversation
@@ -1448,59 +1447,6 @@ def test_driver_and_actor_as_readers(ray_start_cluster): | |||
dag.experimental_compile() | |||
|
|||
|
|||
def test_payload_large(ray_start_cluster): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to multi node test suite
@@ -16,7 +17,7 @@ | |||
# entry/init points. | |||
logger = logging.getLogger(__name__) | |||
|
|||
DEFAULT_MAX_BUFFER_SIZE = int(100 * 1e6) # 100 mB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a fundamental fix. I will fix it in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean the buffer size should be 1MB? If that's the case, can you update comment as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we have this 1MB buffer size
Line 60 in 3e8dd0d
buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES |
But it was not passed correctly (meainng our default buffer size has been 100mb)
timeout_ms, | ||
) | ||
# TODO(sang): Clean the previous ref that won't be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This currently leaks a thread whenever resizing happens. we should fix it
@@ -16,7 +17,7 @@ | |||
# entry/init points. | |||
logger = logging.getLogger(__name__) | |||
|
|||
DEFAULT_MAX_BUFFER_SIZE = int(100 * 1e6) # 100 mB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean the buffer size should be 1MB? If that's the case, can you update comment as well?
_reader_node_ids: Optional[Set["ray.NodeID"]] = None, | ||
_writer_ref: Optional["ray.ObjectRef"] = None, | ||
_reader_ref: Optional["ray.ObjectRef"] = None, | ||
_reader_refs: Optional[Dict[str, "ray.ObjectRef"]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please update docstring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a comment underneath, and given other private args don't have docstring, I will keep it as is. Lmk if you think we should update docstring for all private attr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we should move it to the docstring so that the caller knows what to pass in. There are also args that seem to contain overlapping information, which need clean up or clarification.
self._reader_ref = reader_ref | ||
def __init__( | ||
self, | ||
_node_id_to_reader_info: Dict[str, ReaderInfo] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad. it should not be None
self._reader_node_ids = _reader_node_ids | ||
self._node_id_to_reader_info = _node_id_to_reader_info | ||
|
||
assert self._num_local_readers == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit weird, it is set to 0 at L233, and asserted here. Why not just set it here?
all comments are addressed. premerge passing |
remote_reader_refs, | ||
remote_reader_node_ids, | ||
remote_reader_ids, | ||
remote_num_readers_per_node, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define a struct
for each "node reader"? It is less error prone and we don't need the assert on L357
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
cc @ruisearch42 do you think we can merge this today ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
std::shared_ptr<std::vector<std::shared_ptr<MutableObjectReaderInterface>>> | ||
remote_readers = | ||
std::make_shared<std::vector<std::shared_ptr<MutableObjectReaderInterface>>>(); | ||
// TODO(sang): Currently, these attributes are not cleaned up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which attributes?
RAY_LOG(ERROR) | ||
<< "Failed to transfer object to a remote node for an object id " | ||
<< writer_object_id << ". It can cause hang."; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a hard failure here?
self._worker.core_worker.experimental_channel_register_reader( | ||
reader_ref_info.reader_ref, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to assert this is called exactly once?
…)" This reverts commit 57136b5.
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing. multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes. Signed-off-by: ujjawal-khare <[email protected]>
Why are these changes needed?
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.
multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.
Related issue number
Closes #46269
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.